/*!
 * \file EventCaster.cpp
 * \project	WonderTrader
 *
 * \author Wesley
 * \date 2020/03/30
 *
 * \brief
 */
#include "EventNotifier.h"
#include "WtHelper.h"

#include "../Share/DLLHelper.hpp"
#include "../Share/TimeUtils.hpp"

#include "../Includes/WTSCollection.hpp"
#include "../Includes/WTSTradeDef.hpp"
#include "../Includes/WTSVariant.hpp"

#include "../WTSTools/WTSLogger.h"

#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/writer.h>
namespace rj = rapidjson;

USING_NS_WTP;

void on_mq_log(unsigned long id, const char* message, bool bServer) {}

EventNotifier::EventNotifier()
    : _mq_sid(0), _publisher(NULL), _stopped(false) {}

EventNotifier::~EventNotifier() {
    _stopped = true;
    if (_worker)
        _worker->join();

    _asyncio.stop();

    if (_remover && _mq_sid != 0)
        _remover(_mq_sid);
}

bool EventNotifier::init(WTSVariant* cfg) {
    if (!cfg->getBoolean("active"))
        return false;

    _url = cfg->getCString("url");
    std::string module = DLLHelper::wrap_module("WtMsgQue", "lib");
    //先看工作目录下是否有对应模块
    std::string dllpath = WtHelper::getCWD() + module;
    //如果没有,则再看模块目录,即dll同目录下
    if (!StdFile::exists(dllpath.c_str()))
        dllpath = WtHelper::getInstDir() + module;

    DllHandle dllInst = DLLHelper::load_library(dllpath.c_str());
    if (dllInst == NULL) {
        WTSLogger::error("MQ module {} loading failed", dllpath.c_str());
        return false;
    }

    _creator =
        (FuncCreateMQServer)DLLHelper::get_symbol(dllInst, "create_server");
    if (_creator == NULL) {
        DLLHelper::free_library(dllInst);
        WTSLogger::error("MQ module {} is not compatible", dllpath.c_str());
        return false;
    }

    _remover =
        (FuncDestroyMQServer)DLLHelper::get_symbol(dllInst, "destroy_server");
    _publisher =
        (FundPublishMessage)DLLHelper::get_symbol(dllInst, "publish_message");
    _register =
        (FuncRegCallbacks)DLLHelper::get_symbol(dllInst, "regiter_callbacks");

    //注册回调函数
    _register(on_mq_log);

    //创建一个MQServer
    _mq_sid = _creator(_url.c_str());

    WTSLogger::info("EventNotifier initialized with channel {}", _url.c_str());

    if (_worker == NULL) {
        boost::asio::io_service::work work(_asyncio);
        _worker.reset(new StdThread([this]() {
            while (!_stopped) {
                std::this_thread::sleep_for(std::chrono::milliseconds(2));
                _asyncio.run_one();
                // m_asyncIO.run();
            }
        }));
    }

    return true;
}

void EventNotifier::notify_log(const char* tag, const char* message) {
    if (_mq_sid == 0)
        return;

    std::string strTag = tag;
    std::string strMsg = message;
    _asyncio.post([this, strTag, strMsg]() {
        std::string data;
        {
            rj::Document root(rj::kObjectType);
            rj::Document::AllocatorType& allocator = root.GetAllocator();

            root.AddMember("tag", rj::Value(strTag.c_str(), allocator),
                           allocator);
            root.AddMember("time", TimeUtils::getLocalTimeNow(), allocator);
            root.AddMember("message", rj::Value(strMsg.c_str(), allocator),
                           allocator);

            rj::StringBuffer sb;
            rj::PrettyWriter<rj::StringBuffer> writer(sb);
            root.Accept(writer);

            data = sb.GetString();
        }

        if (_publisher)
            _publisher(_mq_sid, "LOG", data.c_str(),
                       (unsigned long)data.size());
    });
}

void EventNotifier::notify_event(const char* message) {
    if (_mq_sid == 0)
        return;

    std::string strMsg = message;
    _asyncio.post([this, strMsg]() {
        std::string data;
        {
            rj::Document root(rj::kObjectType);
            rj::Document::AllocatorType& allocator = root.GetAllocator();

            root.AddMember("time", TimeUtils::getLocalTimeNow(), allocator);
            root.AddMember("message", rj::Value(strMsg.c_str(), allocator),
                           allocator);

            rj::StringBuffer sb;
            rj::PrettyWriter<rj::StringBuffer> writer(sb);
            root.Accept(writer);

            data = sb.GetString();
        }
        if (_publisher)
            _publisher(_mq_sid, "GRP_EVENT", data.c_str(),
                       (unsigned long)data.size());
    });
}

void EventNotifier::notify(const char* trader, const char* message) {
    if (_mq_sid == 0)
        return;

    std::string strTrader = trader;
    std::string strMsg = message;
    _asyncio.post([this, strTrader, strMsg]() {
        std::string data;
        {
            rj::Document root(rj::kObjectType);
            rj::Document::AllocatorType& allocator = root.GetAllocator();

            root.AddMember("trader", rj::Value(strTrader.c_str(), allocator),
                           allocator);
            root.AddMember("time", TimeUtils::getLocalTimeNow(), allocator);
            root.AddMember("message", rj::Value(strMsg.c_str(), allocator),
                           allocator);

            rj::StringBuffer sb;
            rj::PrettyWriter<rj::StringBuffer> writer(sb);
            root.Accept(writer);

            data = sb.GetString();
        }
        if (_publisher)
            _publisher(_mq_sid, "TRD_NOTIFY", data.c_str(),
                       (unsigned long)data.size());
    });
}

void EventNotifier::notify(const char* trader, uint32_t localid,
                           const char* stdCode, WTSTradeInfo* trdInfo) {
    if (trdInfo == NULL || _mq_sid == 0)
        return;

    std::string strTrader = trader;
    std::string strCode = stdCode;
    trdInfo->retain();
    _asyncio.post([this, strTrader, strCode, localid, trdInfo]() {
        std::string data;
        tradeToJson(strTrader.c_str(), localid, strCode.c_str(), trdInfo, data);
        if (_publisher)
            _publisher(_mq_sid, "TRD_TRADE", data.c_str(),
                       (unsigned long)data.size());
        trdInfo->release();
    });
}

void EventNotifier::notify(const char* trader, uint32_t localid,
                           const char* stdCode, WTSOrderInfo* ordInfo) {
    if (ordInfo == NULL || _mq_sid == 0)
        return;

    std::string strTrader = trader;
    std::string strCode = stdCode;
    ordInfo->retain();
    _asyncio.post([this, strTrader, strCode, localid, ordInfo]() {
        std::string data;
        orderToJson(strTrader.c_str(), localid, strCode.c_str(), ordInfo, data);
        if (_publisher)
            _publisher(_mq_sid, "TRD_ORDER", data.c_str(),
                       (unsigned long)data.size());
    });
}

void EventNotifier::tradeToJson(const char* trader, uint32_t localid,
                                const char* stdCode, WTSTradeInfo* trdInfo,
                                std::string& output) {
    if (trdInfo == NULL) {
        output = "{}";
        return;
    }

    bool isLong = (trdInfo->getDirection() == WDT_LONG);
    bool isOpen = (trdInfo->getOffsetType() == WOT_OPEN);
    bool isToday = (trdInfo->getOffsetType() == WOT_CLOSETODAY);

    {
        rj::Document root(rj::kObjectType);
        rj::Document::AllocatorType& allocator = root.GetAllocator();

        root.AddMember("trader", rj::Value(trader, allocator), allocator);
        root.AddMember("time", TimeUtils::getLocalTimeNow(), allocator);
        root.AddMember("localid", localid, allocator);
        root.AddMember("code", rj::Value(stdCode, allocator), allocator);
        root.AddMember("islong", isLong, allocator);
        root.AddMember("isopen", isOpen, allocator);
        root.AddMember("istoday", isToday, allocator);

        root.AddMember("volume", trdInfo->getVolume(), allocator);
        root.AddMember("price", trdInfo->getPrice(), allocator);

        rj::StringBuffer sb;
        rj::PrettyWriter<rj::StringBuffer> writer(sb);
        root.Accept(writer);

        output = sb.GetString();
    }
}

void EventNotifier::orderToJson(const char* trader, uint32_t localid,
                                const char* stdCode, WTSOrderInfo* ordInfo,
                                std::string& output) {
    if (ordInfo == NULL) {
        output = "{}";
        return;
    }

    bool isLong = (ordInfo->getDirection() == WDT_LONG);
    bool isOpen = (ordInfo->getOffsetType() == WOT_OPEN);
    bool isToday = (ordInfo->getOffsetType() == WOT_CLOSETODAY);
    bool isCanceled = (ordInfo->getOrderState() == WOS_Canceled);

    {
        rj::Document root(rj::kObjectType);
        rj::Document::AllocatorType& allocator = root.GetAllocator();

        root.AddMember("trader", rj::Value(trader, allocator), allocator);
        root.AddMember("time", TimeUtils::getLocalTimeNow(), allocator);
        root.AddMember("localid", localid, allocator);
        root.AddMember("code", rj::Value(stdCode, allocator), allocator);
        root.AddMember("islong", isLong, allocator);
        root.AddMember("isopen", isOpen, allocator);
        root.AddMember("istoday", isToday, allocator);
        root.AddMember("canceled", isCanceled, allocator);

        root.AddMember("total", ordInfo->getVolume(), allocator);
        root.AddMember("left", ordInfo->getVolLeft(), allocator);
        root.AddMember("traded", ordInfo->getVolTraded(), allocator);
        root.AddMember("price", ordInfo->getPrice(), allocator);
        root.AddMember("state", rj::Value(ordInfo->getStateMsg(), allocator),
                       allocator);

        rj::StringBuffer sb;
        rj::PrettyWriter<rj::StringBuffer> writer(sb);
        root.Accept(writer);

        output = sb.GetString();
    }
}

void EventNotifier::notify_chart_index(uint64_t time, const char* straId,
                                       const char* idxName,
                                       const char* lineName, double val) {
    if (_mq_sid == 0)
        return;

    std::string sid = straId;
    std::string iname = idxName;
    std::string lname = lineName;
    _asyncio.post([this, time, sid, iname, lname, val]() {
        std::string data;
        {
            rj::Document root(rj::kObjectType);
            rj::Document::AllocatorType& allocator = root.GetAllocator();

            root.AddMember("strategy", rj::Value(sid.c_str(), allocator),
                           allocator);
            root.AddMember("index_name", rj::Value(iname.c_str(), allocator),
                           allocator);
            root.AddMember("line_name", rj::Value(lname.c_str(), allocator),
                           allocator);
            root.AddMember("time", time, allocator);
            root.AddMember("value", val, allocator);

            rj::StringBuffer sb;
            rj::PrettyWriter<rj::StringBuffer> writer(sb);
            root.Accept(writer);

            data = sb.GetString();
        }
        if (_publisher)
            _publisher(_mq_sid, "CHART_INDEX", data.c_str(),
                       (unsigned long)data.size());
    });
}

void EventNotifier::notify_chart_marker(uint64_t time, const char* straId,
                                        double price, const char* icon,
                                        const char* tag) {
    if (_mq_sid == 0)
        return;

    std::string sid = straId;
    std::string sIcon = icon;
    std::string sTag = tag;
    _asyncio.post([this, time, sid, sIcon, sTag, price]() {
        std::string data;
        {
            rj::Document root(rj::kObjectType);
            rj::Document::AllocatorType& allocator = root.GetAllocator();

            root.AddMember("strategy", rj::Value(sid.c_str(), allocator),
                           allocator);
            root.AddMember("icon", rj::Value(sIcon.c_str(), allocator),
                           allocator);
            root.AddMember("tag", rj::Value(sTag.c_str(), allocator),
                           allocator);
            root.AddMember("time", time, allocator);
            root.AddMember("price", price, allocator);

            rj::StringBuffer sb;
            rj::Writer<rj::StringBuffer> writer(sb);
            root.Accept(writer);

            data = sb.GetString();
        }
        if (_publisher)
            _publisher(_mq_sid, "CHART_MARKER", data.c_str(),
                       (unsigned long)data.size());
    });
}

void EventNotifier::notify_trade(const char* straId, const char* stdCode,
                                 bool isLong, bool isOpen, uint64_t curTime,
                                 double price, const char* userTag) {
    if (_mq_sid == 0)
        return;

    std::string sid = straId;
    std::string code = stdCode;
    std::string tag = userTag;
    _asyncio.post([this, sid, code, tag, isLong, isOpen, curTime, price]() {
        std::string data;
        {
            rj::Document root(rj::kObjectType);
            rj::Document::AllocatorType& allocator = root.GetAllocator();

            root.AddMember("strategy", rj::Value(sid.c_str(), allocator),
                           allocator);
            root.AddMember("code", rj::Value(code.c_str(), allocator),
                           allocator);
            root.AddMember("tag", rj::Value(tag.c_str(), allocator), allocator);
            root.AddMember("long", isLong, allocator);
            root.AddMember("open", isOpen, allocator);
            root.AddMember("time", curTime, allocator);
            root.AddMember("price", price, allocator);

            rj::StringBuffer sb;
            rj::Writer<rj::StringBuffer> writer(sb);
            root.Accept(writer);

            data = sb.GetString();
        }
        if (_publisher)
            _publisher(_mq_sid, "STRA_TRADE", data.c_str(),
                       (unsigned long)data.size());
    });
}